Solutions/Cloudflare/Data Connectors/AzureFunctionCloudflare/main.py (160 lines of code) (raw):

# This is function version 2.0.0 supporting python > 3.9 import os import asyncio from azure.storage.blob.aio import ContainerClient import json import logging import azure.functions as func import re import time import aiohttp from json import JSONDecodeError import traceback from .sentinel_connector_async import AzureSentinelConnectorAsync logging.getLogger( 'azure.core.pipeline.policies.http_logging_policy').setLevel(logging.ERROR) logging.getLogger('charset_normalizer').setLevel(logging.ERROR) MAX_SCRIPT_EXEC_TIME_MINUTES = 5 AZURE_STORAGE_CONNECTION_STRING = os.environ['AZURE_STORAGE_CONNECTION_STRING'] CONTAINER_NAME = os.environ['CONTAINER_NAME'] WORKSPACE_ID = os.environ['WORKSPACE_ID'] SHARED_KEY = os.environ['SHARED_KEY'] LOG_TYPE = 'Cloudflare' LINE_SEPARATOR = os.environ.get( 'lineSeparator', '[\n\r]+') # Defines how many files can be processed simultaneously MAX_CONCURRENT_PROCESSING_FILES = int( os.environ.get('MAX_CONCURRENT_PROCESSING_FILES', 5)) # Defines page size while listing files from blob storage. New page is not processed while old page is processing. MAX_PAGE_SIZE = int(MAX_CONCURRENT_PROCESSING_FILES * 20) # Defines max number of events that can be sent in one request to Azure Sentinel MAX_BUCKET_SIZE = int(os.environ.get('MAX_BUCKET_SIZE', 2000)) # Defines max chunk download size for blob storage in MB MAX_CHUNK_SIZE_MB = int(os.environ.get('MAX_CHUNK_SIZE_MB', 30)) LOG_ANALYTICS_URI = os.environ.get('logAnalyticsUri') if not LOG_ANALYTICS_URI or str(LOG_ANALYTICS_URI).isspace(): LOG_ANALYTICS_URI = 'https://' + WORKSPACE_ID + '.ods.opinsights.azure.com' pattern = r'https:\/\/([\w\-]+)\.ods\.opinsights\.azure.([a-zA-Z\.]+)$' match = re.match(pattern, str(LOG_ANALYTICS_URI)) if not match: raise Exception("Invalid Log Analytics Uri.") async def main(mytimer: func.TimerRequest): try: logging.info('Starting script') logging.info('Concurrency parameters: MAX_CONCURRENT_PROCESSING_FILES {}, MAX_PAGE_SIZE {}, MAX_BUCKET_SIZE {}.'.format( MAX_CONCURRENT_PROCESSING_FILES, MAX_PAGE_SIZE, MAX_BUCKET_SIZE)) conn = AzureBlobStorageConnector( AZURE_STORAGE_CONNECTION_STRING, CONTAINER_NAME, MAX_CONCURRENT_PROCESSING_FILES) container_client = conn._create_container_client() async with container_client: async with aiohttp.ClientSession() as session: cors = [] async for blob in conn.get_blobs(): try: cor = conn.process_blob(blob, container_client, session) cors.append(cor) except Exception as e: logging.error(f'Exception in processing blob is {e}') if len(cors) >= MAX_PAGE_SIZE: await asyncio.gather(*cors) cors = [] if conn.check_if_script_runs_too_long(): logging.info( 'Script is running too long. Stop processing new blobs.') break if cors: await asyncio.gather(*cors) logging.info('Processed {} files with {} events.'.format( conn.total_blobs, conn.total_events)) logging.info('Script finished. Processed files: {}. Processed events: {}'.format( conn.total_blobs, conn.total_events)) except Exception as ex: logging.error('An error occurred in the main script: {}'.format(str(ex))) logging.error(traceback.format_exc()) class AzureBlobStorageConnector: def __init__(self, conn_string, container_name, max_concurrent_processing_fiiles=10): self.__conn_string = conn_string self.__container_name = container_name self.semaphore = asyncio.Semaphore(max_concurrent_processing_fiiles) self.script_start_time = int(time.time()) self.total_blobs = 0 self.total_events = 0 def _create_container_client(self): try: return ContainerClient.from_connection_string(self.__conn_string, self.__container_name, logging_enable=False, max_single_get_size=MAX_CHUNK_SIZE_MB*1024*1024, max_chunk_get_size=MAX_CHUNK_SIZE_MB*1024*1024) except Exception as ex: logging.error('An error occurred in _create_container_client: {}'.format(str(ex))) logging.error(traceback.format_exc()) return None async def get_blobs(self): try: container_client = self._create_container_client() logging.info("inside get_blobs function") async with container_client: async for blob in container_client.list_blobs(): if 'ownership-challenge' not in blob['name']: yield blob except Exception as ex: logging.error(f'An error occurred in get_blobs: {ex}') logging.error(traceback.format_exc()) def check_if_script_runs_too_long(self): now = int(time.time()) duration = now - self.script_start_time max_duration = int(MAX_SCRIPT_EXEC_TIME_MINUTES * 60 * 0.85) return duration > max_duration async def delete_blob(self, blob, container_client): try: logging.info("inside delete_blob function...") logging.info("Deleting blob {}".format(blob['name'])) await container_client.delete_blob(blob['name']) except Exception as ex: logging.error(f'An error occurred while deleting blob {blob["name"]}: {ex}') logging.error(traceback.format_exc()) async def process_blob(self, blob, container_client, session: aiohttp.ClientSession): try: async with self.semaphore: logging.info("Start processing {}".format(blob['name'])) try: sentinel = AzureSentinelConnectorAsync( session, LOG_ANALYTICS_URI, WORKSPACE_ID, SHARED_KEY, LOG_TYPE, queue_size=MAX_BUCKET_SIZE) blob_cor = await container_client.download_blob(blob['name'], encoding="utf-8") except Exception as e: logging.error(f'error while connecting to Sentinel: {e}') logging.error(traceback.format_exc()) s = '' async for chunk in blob_cor.chunks(): s += chunk.decode() lines = re.split(r'{0}'.format(LINE_SEPARATOR), s) for n, line in enumerate(lines): if n < len(lines) - 1: if line: try: event = json.loads(line) except JSONDecodeError as je: logging.error('JSONDecode error while loading json event at line value {}. blob name: {}. Error {}'.format( line, blob['name'], str(je))) raise je except ValueError as e: logging.error('Error while loading json Event at line value {}. blob name: {}. Error: {}'.format( line, blob['name'], str(e))) raise e await sentinel.send(event) s = line if s: try: event = json.loads(s) except JSONDecodeError as je: logging.error('JSONDecode error while loading json event at line value {}. blob name: {}. Error {}'.format( line, blob['name'], str(je))) raise je except ValueError as e: logging.error('Error while loading json Event at s value {}. blob name: {}. Error: {}'.format( line, blob['name'], str(e))) raise e await sentinel.send(event) await sentinel.flush() await self.delete_blob(blob, container_client) self.total_blobs += 1 self.total_events += sentinel.successfull_sent_events_number logging.info("Finish processing {}. Sent events: {}".format( blob['name'], sentinel.successfull_sent_events_number)) if self.total_blobs % 100 == 0: logging.info('Processed {} files with {} events.'.format( self.total_blobs, self.total_events)) except Exception as ex: logging.error(f"Error in process_blob is {ex}")